Introduction Of Spark Cache
Preface
In Spark Sql, the use of cache is common when you need to reuse some intermediate computation result. Understanding the mechanism of spark cache can help developers speed up the computation process and raise the efficiency. This blog will help readers build up a whole picture of spark cache by answering three key questions. First I will briefly introduce what spark cache is by giving specific coding examples in scala. Then I will illustrate the scenario when developers may use spark cache. Finally I will illustrate the working mechanism of spark cache and some notation points derived from my own experience.
What is Spark Cache?
Spark Cache offers a way such that you can store your computation result in memory or disk.
In Scala
1.cache()
1 | val cachedDf = dataframe.select("col1", "col2").filter("age > 10").cache() |
2.persisit() & unpersist()
1 | val cachedDf = dataframe |
In SQL
1 | spark.sql("cache table table_name") |
Types of Spark Cache
There are two ways that you can achieve caching in spark. One way is to use cache() method for which the only storage level is MEMORY_ONLY. The other way is to use persist() with optional storage level .
Cache (Source Code in Scala)
1 | /** |
Persist
- DISK_ONLY: Persist data on disk only in serialized format.
- MEMORY_ONLY: Persist data in memory only in deserialized format.
- MEMORY_AND_DISK: Persist data in memory and if enough memory is not available evicted blocks will be stored on disk.
- OFF_HEAP: Data is persisted in off-heap memory. Refer spark.memory.offHeap.enabled in Spark Doc.
More Storage Level in StorageLevel.scala
When to use Spark Cache?
The rule of thumb for caching is to identify the Dataframe that you will be reusing in your Spark Application and cache it. In other words, use Spark Cache when the target Dataframe(intermediate result) is used by two or more action operations.
Benefit
- Raise the efficiency of I&O since reading data directly from hdfs file is time consuming while reading directly from the executor’s memory or disk is fast and stable.
- Create a check point where spark can recompute the lost RDD directly from the cached Dataframe
How Spark Cache Works?
- When you call cache on a Dataframe, nothing happens with the data but the query plan is updated by adding a new operator—InMemoryRelation
- Spark use Cache Manager to keep track of what computation has already been cached in terms of the query plan. The phase of the Cache Manager happend before the optimizer, after the analyzer in the logic plan stage.
- When you call an action operation on the Dataframe, it will check if some subquery has already been flaged as “cached” by comparing the analyzed logical plan.
- The Caching Process is triggered only when the first action operation is called on some dataframe that use this cachedDF in subquery.
Caution In Usage
- Always use persist with storage level MEMORY_AND_DISK instead of cache(). You should be cautious when you call cache() method on a Datasets or Dataframe. Because the storage level is MEMORY_ONLY, if you attempt to cache a very large Datase, it may trigger the OOM(Out Of Memory) Exception.
- Unpersist the cachedDf when you no longer need it in subsequent computation to release in-memory space.
- There are situations where caching doesn’t help at all and on the contrary slows down the execution. This is related for instance to queries based on large datasets stored in a columnar file format that supports column pruning and predicate pushdown such as parquet. (See https://towardsdatascience.com/best-practices-for-caching-in-spark-sql-b22fb0f02d34)
Further Thoughts
- Difference between cache and checkpoint
- How Spark manage memory use
Reference List
best-practices-for-caching-in-spark-sql
https://spark.apache.org/docs/latest/configuration.html#memory-management
http://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html